---
description: 'Using Queues and Topics for async messaging with Nitric'
---

# Messaging

Nitric provides two common options for scalable, decoupled, asynchronous messaging between services. Topics for publish/subscribe messaging, where new messages are immediately _pushed_ to subscribers, and Queues for _pull_ messaging where new messages are put on a queue and must be requested.

In some circumstances messages sent to a _Topic_ may also be called _Events_, while messages sent to a _Queue_ may be called _Tasks_. The structures of these messages are very similar, but the delivery and retry mechanisms are different. It can be helpful to refer to these messages differently to assist in understanding the context in which they are used.

## Topics

A topic is a named resource where events can be published. They can be thought of as a subject that your services are communicating about.

Topics are often the first choice for communication between services, since they offer stateless, scalable and highly decoupled communication.

If you're interested in the architecture, provisioning, or deployment steps, they can be found [here](/architecture/topics).

### Subscriptions

A subscription is a binding between a topic and a service. You can think of it as a channel that notifies your services when something new arrives on the topic.

### Creating a Topic

Nitric allows you to define named topics. When defining topics, you can give the service permissions for publishing. If permissions are not specified, subscribers can be created.

Here's an example of how to create a topic with permissions to publish messages.

<CodeSwitcher tabs>

```javascript !!
import { topic } from '@nitric/sdk'

const userCreatedTopic = topic('user-created').allow('publish')
```

```typescript !!
import { topic } from '@nitric/sdk'

const userCreatedTopic = topic('user-created').allow('publish')
```

```python !!
from nitric.resources import topic
from nitric.application import Nitric

user_created_topic = topic("user-created").allow("publish")

Nitric.run()
```

```go !!
import (
  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/topics"
)

func main() {
  userCreatedTopic := nitric.NewTopic("user-created").Allow(topics.TopicPublish)

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final userCreatedTopic = Nitric.topic("user-created").allow([
  TopicPermission.publish,
]);
```

</CodeSwitcher>

### Publishing a message

To send a message to a topic and notify all subscribers, use the `publish()` method on the topic reference. The service must have permissions to `publish` to the topic.

The below example publishes a message to a topic called `user-created`.

<CodeSwitcher tabs>

```javascript !!
import { topic } from '@nitric/sdk'

const userCreatedTopic = topic('user-created').allow('publish')

await userCreatedTopic.publish({
  email: 'new.user@example.com',
})
```

```typescript !!
import { topic } from '@nitric/sdk'

const userCreatedTopic = topic('user-created').allow('publish')

await userCreatedTopic.publish({
  email: 'new.user@example.com',
})
```

```python !!
from nitric.resources import topic
from nitric.application import Nitric

user_created_topic = topic("user-created").allow("publish")

await user_created_topic.publish({
  "email": "new.user@example.com"
})

Nitric.run()
```

```go !!
import (
  "context"

  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/topics"
)

func main() {
  userCreatedTopic := nitric.NewTopic("user-created").Allow(topics.TopicPublish)

  _ = userCreatedTopic.Publish(context.TODO(), map[string]interface{}{
    "email": "new.user@example.com",
  })

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final userCreatedTopic = Nitric.topic("user-created").allow([
  TopicPermission.publish,
]);

await userCreatedTopic.publish({
  "email": "new.user@example.com",
});
```

</CodeSwitcher>

### Subscribing to a topic

To execute a function when new messages are published you can create subscribers. The delay between publishing a message and a subscriber being executed is usually only a few milliseconds. This makes subscribers perfect for responding to messages as they happen.

The below code shows a subscription that responds to messages when new users are created.

<CodeSwitcher tabs>

```javascript !!
import { topic } from '@nitric/sdk'
import { sendWelcomeEmail } from 'common'

const userCreatedTopic = topic('user-created')

userCreatedTopic.subscribe(async (ctx) => {
  // Extract data from the event payload for processing
  const { email } = ctx.req.json()

  sendWelcomeEmail(email)
})
```

```typescript !!
import { topic } from '@nitric/sdk'
import { sendWelcomeEmail } from 'common'

const userCreatedTopic = topic('user-created')

userCreatedTopic.subscribe(async (ctx) => {
  // Extract data from the event payload for processing
  const { email } = ctx.req.json()

  sendWelcomeEmail(email)
})
```

```python !!
from nitric.resources import topic
from nitric.application import Nitric
from common import send_welcome_email

user_created_topic = topic("user-created")

@user_created_topic.subscribe()
async def updates_sub(ctx):
  email = ctx.req.data['email']

  send_welcome_email(email)

Nitric.run()
```

```go !!
import (
  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/topics"
)

func main() {
  userCreatedTopic := nitric.NewTopic("user-created")

  userCreatedTopic.Subscribe(func(ctx *topics.Ctx) {
    email := ctx.Request.Message()["email"].(string)

    sendWelcomeEmail(email)
  })

  nitric.Run()
}

func sendWelcomeEmail(email string) {
  // TODO: Implement email sending
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';
import 'package:common';

final userCreatedTopic = Nitric.topic("user-created");

userCreatedTopic.subscribe((ctx) async {
  final email = ctx.req.message["email"];

  sendWelcomeEmail(email);

  return ctx;
});
```

</CodeSwitcher>

### Reliable subscribers

If a subscriber encounters an error or is terminated before it finishes processing a message, what happens? Is the event lost?

Nitric deploys topics to cloud services that support "at-least-once delivery". Messages are _usually_ delivered exactly once, in the same order that they're published. However, to prevent lost messages, they're sometimes delivered more than once or out of order.

Typically, retries occur when a subscriber doesn't respond successfully, like when unhandled exceptions occur. You'll typically want to ensure messages aren't processed again by accident or partially processed, leaving the system in an unexpected state.

Building atomic publishers and idempotent subscribers can solve this.

#### Atomic publishers

Your publishers need to update your database _and_ publish associated events. If a database update fails, the events should _never_ be sent. If the database update succeeds, the events should _always_ publish. The two shouldn't occur independently (i.e. one shouldn't fail while the other succeeds).

One solution to this problem is the [Transactional Outbox Pattern](https://microservices.io/patterns/data/transactional-outbox.html).

#### Idempotent subscribers

Messages from a topic can be delivered more than once, but they should typically only be _processed_ once. To do this your subscribers need to identify and disregard duplicate events.

Usually checking for duplicate payloads or IDs is enough. When you receive an event you've seen before don't process it, skip straight to returning a `success` response from your subscriber.

<CodeSwitcher tabs>

```javascript !!
import { topic } from '@nitric/sdk'
import { isDuplicate } from '../common'

const updates = topic('updates')

updates.subscribe((ctx) => {
  if (isDuplicate(ctx.req)) {
    return ctx
  }
  // not a duplicate, process the event
  // ...
})
```

```typescript !!
import { topic } from '@nitric/sdk'
import { isDuplicate } from '../common'

const updates = topic('updates')

updates.subscribe((ctx) => {
  if (isDuplicate(ctx.req)) {
    return ctx
  }
  // not a duplicate, process the event
  // ...
})
```

```python !!
from nitric.resources import topic
from nitric.application import Nitric
from common import is_duplicate

updates = topic("updates")

@updates.subscribe()
async def process_update(ctx):
  if is_duplicate(ctx.req):
    return ctx

  # not a duplicate, process the event
  # ...

Nitric.run()
```

```go !!
import (
  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/topics"
  "github.com/your-org/your-project/common"
)

func main() {
  userCreatedTopic := nitric.NewTopic("updates")

  userCreatedTopic.Subscribe(func(ctx *topics.Ctx) {
    if common.isDuplicate(ctx.Request) {
      return
    }

    // not a duplicate, process the event
    // ...
  })

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';
import 'package:common';

final updates = Nitric.topic("updates");

updates.subscribe((ctx) async {
  if isDuplicate(ctx.req) {
    return ctx;
  }

  // Not a duplicate, process the event
  // ...

  return ctx;
});
```

</CodeSwitcher>

<Note>
  If you're checking for duplicate IDs, ensure publishers can't resend failed
  events with new IDs.
</Note>

You can read more about idempotent subscribers and patterns to handle it [here](https://microservices.io/post/microservices/patterns/2020/10/16/idempotent-consumer.html).

## Queues

Queues are another option for asynchronous messaging. Unlike [topics](#topics), messages sent to a queue won't automatically trigger services to process them. Instead, services dequeue message by requesting them.

This makes queues ideal for batch workloads, often paired with [schedules](/schedules).

If you're interested in the architecture, provisioning, or deployment steps, they can be found [here](/architecture/queues).

<Note>
  Queue disambiguation: Some systems and cloud providers use the term "queue" to
  refer to a messaging broker with features akin to a topic (i.e. they can push
  messages to subscribers). In Nitric, a queue is a resource that messages are
  sent to and pulled from, while a topic is a resource that messages are sent to
  and pushed from.
</Note>

### Creating a Queue

Nitric allows you to define named queues. When defining queues, you can give the service permissions for enqueueing and dequeueing messages.

Here's an example of how to create a queue with permissions for enqueueing and dequeueing.

<CodeSwitcher tabs>

```javascript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue', 'dequeue')
```

```typescript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue', 'dequeue')
```

```python !!
from nitric.resources import queue

transaction_queue = queue("transactions").allow("enqueue", "dequeue")

Nitric.run()
```

```go !!
import (
  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/queues"
)

func main() {
  transactionQueue := nitric.NewQueue("transactions").Allow(queues.QueueEnqueue, queues.QueueDequeue)

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final transactionQueue = Nitric.queue("transactions").allow([
  QueuePermission.enqueue,
  QueuePermission.dequeue,
]);
```

</CodeSwitcher>

### Enqueue Messages

To send a message to a queue, use the `enqueue()` method on the queue reference. The function must have permissions to `enqueue` to the queue.

The below example sends a message to a queue called `transactions`.

<CodeSwitcher tabs>

```javascript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue')

await transactionQueue.enqueue({
  message: 'hello world',
})
```

```typescript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue')

await transactionQueue.enqueue({
  message: 'hello world',
})
```

```python !!
from nitric.resources import queue

transaction_queue = queue("transactions").allow("enqueue")

await transaction_queue.enqueue({
  "message": "hello world"
})

Nitric.run()
```

```go !!
import (
  "context"

  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/queues"
)

func main() {
  transactionQueue := nitric.NewQueue("transactions").Allow(queues.QueueEnqueue)

  messages := []map[string]interface{}{
    {"message": "hello world"},
  }

  transactionQueue.Enqueue(context.TODO(), messages)

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final transactionQueue = Nitric.queue("transactions").allow([
  QueuePermission.enqueue,
]);

final messages = [{ "message": "hello world" }];

await transactionQueue.enqueue(messages);
```

</CodeSwitcher>

Messages can also be sent in batches by providing an array.

<CodeSwitcher tabs>

```javascript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue')

await transactionQueue.enqueue([
  {
    message: 'batch task 1',
  },
  {
    message: 'batch task 2',
  },
])
```

```typescript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('enqueue')

await transactionQueue.enqueue([
  {
    message: 'batch task 1',
  },
  {
    message: 'batch task 2',
  },
])
```

```python !!
from nitric.resources import queue
from nitric.application import Nitric

transaction_queue = queue("transactions").allow("enqueue")

await transaction_queue.enqueue([
  {
    "message": "batch task 1"
  }, {
    "message": "batch task 2"
  }
])

Nitric.run()
```

```go !!
import (
  "context"

  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/queues"
)

func main() {
  transactionQueue := nitric.NewQueue("transactions").Allow(queues.QueueEnqueue)

  messages := []map[string]interface{}{
    {"message": "batch task 1"},
    {"message": "batch task 2"},
  }

  transactionQueue.Enqueue(context.TODO(), messages)

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final transactionQueue = Nitric.queue("transactions").allow([
  QueuePermission.enqueue,
]);

await transactionQueue.enqueue([
  {
    "message": "batch task 1"
  }, {
    "message": "batch task 2"
  }
]);
```

</CodeSwitcher>

### Dequeueing and Acknowledging Messages

When you dequeue messages they are not immediately deleted from the queue. Instead, they are leased, which means they are temporarily hidden from other services until the lease expires.

To ensure proper handling, your code should mark a dequeued message as `complete` after successfully processing it. This action permanently removes the message from the queue.

If a lease expires before a dequeued message is marked as complete, the message will reappear in the queue and can be dequeued again. This mechanism prevents messages from getting lost in case of failures. If your service encounters an error or is terminated before completing processing of a dequeued message, it will automatically reappear in the queue, ready to be processed again.

By following this approach, you can ensure reliable message processing and minimize the chances of losing data in failure scenarios.

The below example dequeues 10 messages using the `dequeue()` method, then acknowledges them as complete using the `complete()` method. You'll note that the service has permissions to dequeue messages.

<CodeSwitcher tabs>

```javascript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('dequeue')

const tasks = await transactionQueue.dequeue(10)

for (let task of tasks) {
  // process your task's data
  console.log(task.message)

  // acknowledge when the task is complete
  await task.complete()
}
```

```typescript !!
import { queue } from '@nitric/sdk'

const transactionQueue = queue('transactions').allow('dequeue')

const tasks = await transactionQueue.dequeue(10)

for (let task of tasks) {
  // process your task's data
  console.log(task.message)

  // acknowledge when the task is complete
  await task.complete()
}
```

```python !!
from nitric.resources import queue

transaction_queue = queue("transactions").allow("dequeue")

tasks = await transaction_queue.dequeue(10)

for task in tasks:
  # process your task's data
  print(task.message)

  # acknowledge when the task is complete
  await task.complete()
```

```go !!
import (
  "context"
  "fmt"

  "github.com/nitrictech/go-sdk/nitric"
  "github.com/nitrictech/go-sdk/nitric/queues"
)

func main() {
  transactionQueue := nitric.NewQueue("transactions").Allow(queues.QueueDequeue)

  messages, _ := transactionQueue.Dequeue(context.TODO(), 10)

  for _, message := range messages {
    // process your message's data
    fmt.Println(message.Message())

    // acknowledge when the message is complete
    message.Complete(context.TODO())
  }

  nitric.Run()
}
```

```dart !!
import 'package:nitric_sdk/nitric.dart';

final batchQueue = Nitric.queue("batch").allow([
  QueuePermission.dequeue,
]);

final messages = await batchQueue.dequeue(10);

await Future.wait(messages.map((message) async {
  // process your message's data
  print(message.message);

  // acknowledge when the message is complete
  await message.complete();
}));
```

</CodeSwitcher>

## Choosing between queues and topics

It's common to ask when to use a queue or a topic. From a publisher's point of view, both queues and topics are almost identical. The difference is primarily on the receiver/subscriber side. Topics push new messages to their subscribers, immediately spinning up workers to process them, while queues rely on the receiver to ask for new messages to process.

For these reasons, we usually default to Topics. Queues are more suitable for batch workloads or situations where there are occasional surges of requests that can be processed at a later time.

## Cloud Service Mapping

Each cloud provider comes with a set of default services used when deploying resources. You can find the default services for each cloud provider below.

### Topics

- [AWS](/providers/mappings/aws/topics)
- [Azure](/providers/mappings/azure/topics)
- [Google Cloud](/providers/mappings/gcp/topics)

### Queues

- [AWS](/providers/mappings/aws/queues)
- [Azure](/providers/mappings/azure/queues)
- [Google Cloud](/providers/mappings/gcp/queues)
